home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_poll.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2005-10-18  |  5KB  |  190 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. import sys
  5. import os
  6. import select
  7. import random
  8. from test.test_support import verify, verbose, TestSkipped, TESTFN
  9.  
  10. try:
  11.     select.poll
  12. except AttributeError:
  13.     raise TestSkipped, 'select.poll not defined -- skipping test_poll'
  14.  
  15.  
  16. def find_ready_matching(ready, flag):
  17.     match = []
  18.     for fd, mode in ready:
  19.         if mode & flag:
  20.             match.append(fd)
  21.             continue
  22.     
  23.     return match
  24.  
  25.  
  26. def test_poll1():
  27.     '''Basic functional test of poll object
  28.  
  29.     Create a bunch of pipe and test that poll works with them.
  30.     '''
  31.     print 'Running poll test 1'
  32.     p = select.poll()
  33.     NUM_PIPES = 12
  34.     MSG = ' This is a test.'
  35.     MSG_LEN = len(MSG)
  36.     readers = []
  37.     writers = []
  38.     r2w = { }
  39.     w2r = { }
  40.     for i in range(NUM_PIPES):
  41.         (rd, wr) = os.pipe()
  42.         p.register(rd, select.POLLIN)
  43.         p.register(wr, select.POLLOUT)
  44.         readers.append(rd)
  45.         writers.append(wr)
  46.         r2w[rd] = wr
  47.         w2r[wr] = rd
  48.     
  49.     while writers:
  50.         ready = p.poll()
  51.         ready_writers = find_ready_matching(ready, select.POLLOUT)
  52.         if not ready_writers:
  53.             raise RuntimeError, 'no pipes ready for writing'
  54.         
  55.         wr = random.choice(ready_writers)
  56.         os.write(wr, MSG)
  57.         ready = p.poll()
  58.         ready_readers = find_ready_matching(ready, select.POLLIN)
  59.         if not ready_readers:
  60.             raise RuntimeError, 'no pipes ready for reading'
  61.         
  62.         rd = random.choice(ready_readers)
  63.         buf = os.read(rd, MSG_LEN)
  64.         verify(len(buf) == MSG_LEN)
  65.         print buf
  66.         os.close(r2w[rd])
  67.         os.close(rd)
  68.         p.unregister(r2w[rd])
  69.         p.unregister(rd)
  70.         writers.remove(r2w[rd])
  71.     poll_unit_tests()
  72.     print 'Poll test 1 complete'
  73.  
  74.  
  75. def poll_unit_tests():
  76.     FD = 42
  77.     
  78.     try:
  79.         os.close(FD)
  80.     except OSError:
  81.         pass
  82.  
  83.     p = select.poll()
  84.     p.register(FD)
  85.     r = p.poll()
  86.     verify(r[0] == (FD, select.POLLNVAL))
  87.     f = open(TESTFN, 'w')
  88.     fd = f.fileno()
  89.     p = select.poll()
  90.     p.register(f)
  91.     r = p.poll()
  92.     verify(r[0][0] == fd)
  93.     f.close()
  94.     r = p.poll()
  95.     verify(r[0] == (fd, select.POLLNVAL))
  96.     os.unlink(TESTFN)
  97.     p = select.poll()
  98.     
  99.     try:
  100.         p.register(p)
  101.     except TypeError:
  102.         pass
  103.  
  104.     print 'Bogus register call did not raise TypeError'
  105.     
  106.     try:
  107.         p.unregister(p)
  108.     except TypeError:
  109.         pass
  110.  
  111.     print 'Bogus unregister call did not raise TypeError'
  112.     p = select.poll()
  113.     
  114.     try:
  115.         p.unregister(3)
  116.     except KeyError:
  117.         pass
  118.  
  119.     print 'Bogus unregister call did not raise KeyError'
  120.     pollster = select.poll()
  121.     
  122.     class Nope:
  123.         pass
  124.  
  125.     
  126.     class Almost:
  127.         
  128.         def fileno(self):
  129.             return 'fileno'
  130.  
  131.  
  132.     
  133.     try:
  134.         pollster.register(Nope(), 0)
  135.     except TypeError:
  136.         pass
  137.  
  138.     print 'expected TypeError exception, not raised'
  139.     
  140.     try:
  141.         pollster.register(Almost(), 0)
  142.     except TypeError:
  143.         pass
  144.  
  145.     print 'expected TypeError exception, not raised'
  146.  
  147.  
  148. def test_poll2():
  149.     print 'Running poll test 2'
  150.     cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
  151.     p = os.popen(cmd, 'r')
  152.     pollster = select.poll()
  153.     pollster.register(p, select.POLLIN)
  154.     for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,) * 10:
  155.         if verbose:
  156.             print 'timeout =', tout
  157.         
  158.         fdlist = pollster.poll(tout)
  159.         if fdlist == []:
  160.             continue
  161.         
  162.         (fd, flags) = fdlist[0]
  163.         if flags & select.POLLHUP:
  164.             line = p.readline()
  165.             if line != '':
  166.                 print 'error: pipe seems to be closed, but still returns data'
  167.                 continue
  168.             continue
  169.             continue
  170.         if flags & select.POLLIN:
  171.             line = p.readline()
  172.             if verbose:
  173.                 print repr(line)
  174.             
  175.             if not line:
  176.                 if verbose:
  177.                     print 'EOF'
  178.                 
  179.                 break
  180.                 continue
  181.             continue
  182.             continue
  183.         print 'Unexpected return value from select.poll:', fdlist
  184.     
  185.     p.close()
  186.     print 'Poll test 2 complete'
  187.  
  188. test_poll1()
  189. test_poll2()
  190.